// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::num::NonZeroU32;

use anyhow::{Context, bail};
use fnv::FnvHashSet;
use quickwit_proto::types::DocMappingUid;
use quickwit_query::create_default_quickwit_tokenizer_manager;
use quickwit_query::query_ast::QueryAst;
use quickwit_query::tokenizers::TokenizerManager;
use serde::{Deserialize, Serialize};
use serde_json::{self, Value as JsonValue};
use serde_json_borrow::Map as BorrowedJsonMap;
use tantivy::TantivyDocument as Document;
use tantivy::query::Query;
use tantivy::schema::{Field, FieldType, INDEXED, OwnedValue as TantivyValue, STORED, Schema};

use super::DocMapperBuilder;
use super::field_mapping_entry::RAW_TOKENIZER_NAME;
use super::field_presence::populate_field_presence;
use super::tantivy_val_to_json::tantivy_value_to_json;
use crate::doc_mapper::mapping_tree::{
    JsonValueIterator, MappingNode, MappingNodeRoot, build_field_path_from_str, build_mapping_tree,
    map_primitive_json_to_tantivy,
};
use crate::doc_mapper::{FieldMappingType, JsonObject, Partition};
use crate::query_builder::build_query;
use crate::routing_expression::RoutingExpr;
use crate::{
    Cardinality, DOCUMENT_SIZE_FIELD_NAME, DYNAMIC_FIELD_NAME, DocMapping, DocParsingError,
    FIELD_PRESENCE_FIELD_NAME, Mode, ModeType, NamedField, QueryParserError, SOURCE_FIELD_NAME,
    TokenizerEntry, WarmupInfo,
};

const FIELD_PRESENCE_FIELD: Field = Field::from_field_id(0u32);

/// which defines a set of rules to map json fields
/// to tantivy index fields.
///
/// The mains rules are defined by the field mappings.
#[derive(Clone, Serialize, Deserialize)]
#[serde(into = "DocMapperBuilder", try_from = "DocMapperBuilder")]
pub struct DocMapper {
    /// The UID of the doc mapping.
    doc_mapping_uid: DocMappingUid,
    /// Field in which the source should be stored.
    /// This field is only valid when using the schema associated with the default
    /// doc mapper, and therefore cannot be used in the `query` method.
    source_field: Option<Field>,
    /// Indexes field presence. It is necessary to enable this in order to run exists
    /// queries.
    index_field_presence: bool,
    /// Field in which the dynamically mapped fields should be stored.
    /// This field is only valid when using the schema associated with the default
    /// doc mapper, and therefore cannot be used in the `query` method.
    dynamic_field: Option<Field>,
    /// Field in which the len of the source document is stored as a fast field.
    document_size_field: Option<Field>,
    /// Default list of field names used for search.
    default_search_field_names: Vec<String>,
    /// Timestamp field name.
    timestamp_field_name: Option<String>,
    /// Timestamp field path (name parsed)
    timestamp_field_path: Option<Vec<String>>,
    /// Root node of the field mapping tree.
    /// See [`MappingNode`].
    field_mappings: MappingNode,
    /// Concat fields which needs to learn about any element put in dynamic_field
    concatenate_dynamic_fields: Vec<Field>,
    /// Schema generated by the store source and field mappings parameters.
    schema: Schema,
    /// List of field names used for tagging.
    tag_field_names: BTreeSet<String>,
    /// The partition key is a DSL used to route documents
    /// into specific splits.
    partition_key: RoutingExpr,
    /// Maximum number of partitions
    max_num_partitions: NonZeroU32,
    /// Defines how unmapped fields should be handle.
    mode: Mode,
    /// User-defined tokenizers.
    tokenizer_entries: Vec<TokenizerEntry>,
    /// Tokenizer manager.
    tokenizer_manager: TokenizerManager,
}

fn validate_timestamp_field(
    timestamp_field_path: &str,
    mapping_root_node: &MappingNode,
) -> anyhow::Result<()> {
    if timestamp_field_path.starts_with('.') || timestamp_field_path.starts_with("\\.") {
        bail!("timestamp field `{timestamp_field_path}` should not start with a `.`");
    }
    if timestamp_field_path.ends_with('.') {
        bail!("timestamp field `{timestamp_field_path}` should not end with a `.`");
    }
    let Some(timestamp_field_type) =
        mapping_root_node.find_field_mapping_type(timestamp_field_path)
    else {
        bail!("could not find timestamp field `{timestamp_field_path}` in field mappings");
    };
    if let FieldMappingType::DateTime(date_time_option, cardinality) = &timestamp_field_type {
        if cardinality != &Cardinality::SingleValued {
            bail!("timestamp field `{timestamp_field_path}` should be single-valued");
        }
        if !date_time_option.fast {
            bail!("timestamp field `{timestamp_field_path}` should be a fast field");
        }
    } else {
        bail!("timestamp field `{timestamp_field_path}` should be a datetime field");
    }
    Ok(())
}

impl From<DocMapper> for DocMapperBuilder {
    fn from(default_doc_mapper: DocMapper) -> Self {
        let partition_key_str = default_doc_mapper.partition_key.to_string();
        let partition_key_opt: Option<String> = if !partition_key_str.is_empty() {
            Some(partition_key_str)
        } else {
            None
        };
        let doc_mapping = DocMapping {
            doc_mapping_uid: default_doc_mapper.doc_mapping_uid,
            mode: default_doc_mapper.mode,
            field_mappings: default_doc_mapper.field_mappings.into(),
            timestamp_field: default_doc_mapper.timestamp_field_name,
            tag_fields: default_doc_mapper.tag_field_names,
            partition_key: partition_key_opt,
            max_num_partitions: default_doc_mapper.max_num_partitions,
            index_field_presence: default_doc_mapper.index_field_presence,
            store_document_size: default_doc_mapper.document_size_field.is_some(),
            store_source: default_doc_mapper.source_field.is_some(),
            tokenizers: default_doc_mapper.tokenizer_entries,
        };
        Self {
            doc_mapping,
            default_search_fields: default_doc_mapper.default_search_field_names,
            legacy_type_tag: None,
        }
    }
}

impl TryFrom<DocMapperBuilder> for DocMapper {
    type Error = anyhow::Error;

    fn try_from(builder: DocMapperBuilder) -> anyhow::Result<DocMapper> {
        let mut schema_builder = Schema::builder();

        // We want the field ID of the field presence field to be 0, so we add it to the schema
        // first.
        let field_presence_field = schema_builder.add_u64_field(FIELD_PRESENCE_FIELD_NAME, INDEXED);
        assert_eq!(field_presence_field, FIELD_PRESENCE_FIELD);

        let doc_mapping = builder.doc_mapping;

        let dynamic_field = if let Mode::Dynamic(json_options) = &doc_mapping.mode {
            Some(schema_builder.add_json_field(DYNAMIC_FIELD_NAME, json_options.clone()))
        } else {
            None
        };
        let document_size_field = if doc_mapping.store_document_size {
            let document_size_field_options = tantivy::schema::NumericOptions::default().set_fast();
            Some(
                schema_builder.add_u64_field(DOCUMENT_SIZE_FIELD_NAME, document_size_field_options),
            )
        } else {
            None
        };
        let source_field = if doc_mapping.store_source {
            Some(schema_builder.add_json_field(SOURCE_FIELD_NAME, STORED))
        } else {
            None
        };
        let MappingNodeRoot {
            field_mappings,
            concatenate_dynamic_fields,
        } = build_mapping_tree(&doc_mapping.field_mappings, &mut schema_builder)?;
        if !concatenate_dynamic_fields.is_empty() && dynamic_field.is_none() {
            bail!("concatenate field has `include_dynamic_fields` set, but index isn't dynamic");
        }
        let timestamp_field_path = if let Some(timestamp_field_name) = &doc_mapping.timestamp_field
        {
            validate_timestamp_field(timestamp_field_name, &field_mappings)?;
            Some(build_field_path_from_str(timestamp_field_name))
        } else {
            None
        };
        let schema = schema_builder.build();

        let tokenizer_manager = create_default_quickwit_tokenizer_manager();
        let mut custom_tokenizer_names = HashSet::new();
        for tokenizer_config_entry in &doc_mapping.tokenizers {
            if custom_tokenizer_names.contains(&tokenizer_config_entry.name) {
                bail!(
                    "duplicated custom tokenizer: `{}`",
                    tokenizer_config_entry.name
                );
            }
            if tokenizer_manager
                .get_tokenizer(&tokenizer_config_entry.name)
                .is_some()
            {
                bail!(
                    "custom tokenizer name `{}` should be different from built-in tokenizer's \
                     names",
                    tokenizer_config_entry.name
                );
            }
            let tokenizer = tokenizer_config_entry
                .config
                .text_analyzer()
                .map_err(|error| {
                    anyhow::anyhow!(
                        "failed to build tokenizer `{}`: {:?}",
                        tokenizer_config_entry.name,
                        error
                    )
                })?;
            let does_lowercasing = tokenizer_config_entry
                .config
                .filters
                .iter()
                .any(|filter| matches!(filter, crate::TokenFilterType::LowerCaser));
            tokenizer_manager.register(&tokenizer_config_entry.name, tokenizer, does_lowercasing);
            custom_tokenizer_names.insert(&tokenizer_config_entry.name);
        }
        validate_fields_tokenizers(&schema, &tokenizer_manager)?;

        // Resolve default search fields
        let mut default_search_field_names = Vec::new();
        for default_search_field_name in &builder.default_search_fields {
            if default_search_field_names.contains(default_search_field_name) {
                bail!(
                    "duplicated default search field: `{}`",
                    default_search_field_name
                )
            }
            let (default_search_field, _json_path) = schema
                .find_field_with_default(default_search_field_name, dynamic_field)
                .with_context(|| {
                    format!("unknown default search field `{default_search_field_name}`")
                })?;
            if !schema.get_field_entry(default_search_field).is_indexed() {
                bail!("default search field `{default_search_field_name}` is not indexed",);
            }
            default_search_field_names.push(default_search_field_name.clone());
        }

        // Resolve tag fields
        for tag_field_name in &doc_mapping.tag_fields {
            validate_tag(tag_field_name, &schema)?;
        }

        let partition_key_expr: &str = doc_mapping.partition_key.as_deref().unwrap_or("");
        let partition_key = RoutingExpr::new(partition_key_expr).with_context(|| {
            format!("failed to interpret the partition key: `{partition_key_expr}`")
        })?;

        // If valid, partition key fields should be considered as tags.
        let mut tag_field_names = doc_mapping.tag_fields;

        for partition_key in partition_key.field_names() {
            if validate_tag(&partition_key, &schema).is_ok() {
                tag_field_names.insert(partition_key);
            }
        }
        Ok(DocMapper {
            doc_mapping_uid: doc_mapping.doc_mapping_uid,
            schema,
            index_field_presence: doc_mapping.index_field_presence,
            source_field,
            dynamic_field,
            document_size_field,
            default_search_field_names,
            timestamp_field_name: doc_mapping.timestamp_field,
            timestamp_field_path,
            field_mappings,
            concatenate_dynamic_fields,
            tag_field_names,
            partition_key,
            max_num_partitions: doc_mapping.max_num_partitions,
            mode: doc_mapping.mode,
            tokenizer_entries: doc_mapping.tokenizers,
            tokenizer_manager,
        })
    }
}

/// Checks that a given field name is a valid candidate for a tag.
///
/// The conditions are:
/// - the field must be str, u64, or i64
/// - if str, the field must use the `raw` tokenizer for indexing.
/// - the field must be indexed.
fn validate_tag(tag_field_name: &str, schema: &Schema) -> Result<(), anyhow::Error> {
    if tag_field_name.starts_with('.') || tag_field_name.starts_with("\\.") {
        bail!("tag field `{tag_field_name}` should not start with a `.`");
    }
    if tag_field_name.ends_with('.') {
        bail!("tag field `{tag_field_name}` should not end with a `.`");
    }
    let field = schema
        .get_field(tag_field_name)
        .with_context(|| format!("unknown tag field: `{tag_field_name}`"))?;
    let field_type = schema.get_field_entry(field).field_type();
    match field_type {
        FieldType::Str(options) => {
            let tokenizer_opt = options
                .get_indexing_options()
                .map(|text_options: &tantivy::schema::TextFieldIndexing| text_options.tokenizer());
            if tokenizer_opt != Some(RAW_TOKENIZER_NAME) {
                bail!("tags collection is only allowed on text fields with the `raw` tokenizer");
            }
        }
        FieldType::U64(_) | FieldType::I64(_) => {
            // u64 and i64 are accepted as tags.
        }
        _ => {
            // We avoid the bytes / bool / f64 types,
            // as they are generally speaking poor tags and we want to avoid
            // bugs associated to the multiplicity of their representation.
            //
            // (Tags are relying heavily on string manipulation and we want to
            // avoid a "ZRP because you searched you searched for 0.100 instead of 0.1",
            // or `myflag:1`, `myflag:True` instead of `myflag:true`.
            bail!(
                "tags collection is not allowed on `{}` fields",
                field_type.value_type().name().to_lowercase()
            )
        }
    }
    if !field_type.is_indexed() {
        bail!(
            "tag fields are required to be indexed. (`{}` is not configured as indexed)",
            tag_field_name
        )
    }
    Ok(())
}

/// Checks that a given text/json field name has a registered tokenizer.
fn validate_fields_tokenizers(
    schema: &Schema,
    tokenizer_manager: &TokenizerManager,
) -> Result<(), anyhow::Error> {
    for (_, field_entry) in schema.fields() {
        let tokenizer_name_opt = match field_entry.field_type() {
            FieldType::Str(options) => options
                .get_indexing_options()
                .map(|text_options: &tantivy::schema::TextFieldIndexing| text_options.tokenizer()),
            FieldType::JsonObject(options) => options
                .get_text_indexing_options()
                .map(|text_options: &tantivy::schema::TextFieldIndexing| text_options.tokenizer()),
            _ => None,
        };
        if let Some(tokenizer_name) = tokenizer_name_opt
            && tokenizer_manager.get_tokenizer(tokenizer_name).is_none()
        {
            bail!(
                "unknown tokenizer `{}` for field `{}`",
                tokenizer_name,
                field_entry.name()
            );
        }
    }
    Ok(())
}

impl std::fmt::Debug for DocMapper {
    fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
        formatter
            .debug_struct("DocMapper")
            .field("store_source", &self.source_field.is_some())
            .field(
                "default_search_field_names",
                &self.default_search_field_names,
            )
            .field("timestamp_field_name", &self.timestamp_field_name())
            // TODO: complete it.
            .finish()
    }
}

fn extract_single_obj(
    doc: &mut BTreeMap<String, Vec<TantivyValue>>,
    key: &str,
) -> anyhow::Result<Option<serde_json::Map<String, JsonValue>>> {
    let mut values = if let Some(values) = doc.remove(key) {
        values
    } else {
        return Ok(None);
    };
    if values.len() > 1 {
        bail!(
            "invalid named document. there are more than 1 value associated to the `{key}` field"
        );
    }
    match values.pop() {
        Some(TantivyValue::Object(dynamic_json_obj)) => Ok(Some(
            dynamic_json_obj
                .into_iter()
                .map(|(key, val)| (key, tantivy_value_to_json(val)))
                .collect(),
        )),
        Some(_) => {
            bail!("the `{key}` value has to be a json object");
        }
        None => Ok(None),
    }
}

impl DocMapper {
    /// Returns the unique identifier of the doc mapping.
    pub fn doc_mapping_uid(&self) -> DocMappingUid {
        self.doc_mapping_uid
    }

    /// Validates a JSON object according to the doc mapper.
    pub fn validate_json_obj(&self, json_obj: &BorrowedJsonMap) -> Result<(), DocParsingError> {
        let is_strict = self.mode.mode_type() == ModeType::Strict;
        let mut field_path = Vec::new();
        self.field_mappings
            .validate_from_json(json_obj, is_strict, &mut field_path)?;
        if let Some(timestamp_field_path) = &self.timestamp_field_path {
            let missing_ts_field =
                || DocParsingError::RequiredField("timestamp field is required".to_string());
            match &timestamp_field_path[..] {
                [] => (), // ?
                [single_part] => {
                    let obj = json_obj.get(single_part).ok_or_else(missing_ts_field)?;
                    if !(obj.is_string() || obj.is_number()) {
                        return Err(missing_ts_field());
                    }
                }
                [first_part, more_part @ ..] => {
                    let mut obj = json_obj.get(first_part).ok_or_else(missing_ts_field)?;
                    for part in more_part {
                        obj = obj
                            .as_object()
                            .ok_or_else(missing_ts_field)?
                            .get(part)
                            .ok_or_else(missing_ts_field)?;
                    }
                    if !(obj.is_string() || obj.is_number()) {
                        return Err(missing_ts_field());
                    }
                }
            };
        }
        Ok(())
    }

    /// Parses a JSON byte slice into a tantivy [`Document`].
    pub fn doc_from_json_bytes(
        &self,
        json_doc: &[u8],
    ) -> Result<(Partition, Document), DocParsingError> {
        let json_obj: JsonObject = serde_json::from_slice(json_doc).map_err(|_| {
            let json_doc_sample: String = std::str::from_utf8(json_doc)
                .map(|doc_str| doc_str.chars().take(20).chain("...".chars()).collect())
                .unwrap_or_else(|_| "document contains some invalid UTF-8 characters".to_string());
            DocParsingError::NotJsonObject(json_doc_sample)
        })?;
        self.doc_from_json_obj(json_obj, json_doc.len() as u64)
    }

    /// Parses a JSON string into a tantivy [`Document`].
    pub fn doc_from_json_str(
        &self,
        json_doc: &str,
    ) -> Result<(Partition, Document), DocParsingError> {
        let json_obj: JsonObject = serde_json::from_str(json_doc).map_err(|_| {
            let json_doc_sample: String = json_doc.chars().take(20).chain("...".chars()).collect();
            DocParsingError::NotJsonObject(json_doc_sample)
        })?;
        self.doc_from_json_obj(json_obj, json_doc.len() as u64)
    }

    /// Transforms a JSON object into a tantivy [`Document`] according to the rules
    /// defined for the `DocMapper`.
    pub fn doc_from_json_obj(
        &self,
        json_obj: JsonObject,
        document_len: u64,
    ) -> Result<(Partition, Document), DocParsingError> {
        let partition: Partition = self.partition_key.eval_hash(&json_obj);

        let mut dynamic_json_obj = serde_json::Map::default();
        let mut field_path = Vec::new();
        let mut document = Document::default();

        if let Some(source_field) = self.source_field {
            document.add_object(
                source_field,
                json_obj
                    .clone()
                    .into_iter()
                    .map(|(key, val)| (key, TantivyValue::from(val)))
                    .collect(),
            );
        }

        let mode = self.mode.mode_type();
        self.field_mappings.doc_from_json(
            json_obj,
            mode,
            &mut document,
            &mut field_path,
            &mut dynamic_json_obj,
        )?;

        if let Some(dynamic_field) = self.dynamic_field
            && !dynamic_json_obj.is_empty()
        {
            if !self.concatenate_dynamic_fields.is_empty() {
                let json_obj_values =
                    JsonValueIterator::new(serde_json::Value::Object(dynamic_json_obj.clone()))
                        .flat_map(map_primitive_json_to_tantivy);

                for value in json_obj_values {
                    for concatenate_dynamic_field in self.concatenate_dynamic_fields.iter() {
                        document.add_field_value(*concatenate_dynamic_field, &value);
                    }
                }
            }
            document.add_object(
                dynamic_field,
                dynamic_json_obj
                    .into_iter()
                    .map(|(key, val)| (key, TantivyValue::from(val)))
                    .collect(),
            );
        }

        if let Some(document_size_field) = self.document_size_field {
            document.add_u64(document_size_field, document_len);
        }

        if self.index_field_presence {
            let field_presence_hashes: FnvHashSet<u64> =
                populate_field_presence(&document, &self.schema, true);
            for field_presence_hash in field_presence_hashes {
                document.add_field_value(FIELD_PRESENCE_FIELD, &field_presence_hash);
            }
        }
        Ok((partition, document))
    }

    /// Converts a tantivy named Document to the json format.
    ///
    /// Tantivy does not have any notion of cardinality nor object.
    /// It is therefore up to the `DocMapper` to pick a tantivy named document
    /// and convert it into a final quickwit document.
    ///
    /// Because this operation is dependent on the `DocMapper`, this
    /// method is meant to be called on the root node using the most recent
    /// `DocMapper`. This ensures that the different hits are formatted according
    /// to the same schema.
    pub fn doc_to_json(
        &self,
        mut named_doc: BTreeMap<String, Vec<TantivyValue>>,
    ) -> anyhow::Result<serde_json::Map<String, JsonValue>> {
        let mut doc_json =
            extract_single_obj(&mut named_doc, DYNAMIC_FIELD_NAME)?.unwrap_or_default();
        let mut field_path: Vec<&str> = Vec::new();
        self.field_mappings
            .populate_json(&mut named_doc, &mut field_path, &mut doc_json);
        if let Some(source_json) = extract_single_obj(&mut named_doc, SOURCE_FIELD_NAME)? {
            doc_json.insert(
                SOURCE_FIELD_NAME.to_string(),
                JsonValue::Object(source_json),
            );
        }
        if matches!(
            self.mode,
            Mode::Dynamic(ref opt) if opt.stored
        ) {
            // if we are in dynamic mode and there are other fields lefts, we should print them.
            // They probably come from older schemas when these fields had a dedicated entry
            'field: for (key, mut value) in named_doc {
                if key.starts_with('_') {
                    // this is an internal field, not meant to be shown
                    continue 'field;
                }
                let Ok(path) = crate::routing_expression::parse_field_name(&key) else {
                    continue 'field;
                };
                let Some((last_segment, path)) = path.split_last() else {
                    continue 'field;
                };
                let mut map = &mut doc_json;
                for segment in path {
                    let obj = if map.contains_key(&**segment) {
                        // we have to do this strange dance to please the borrowchecker
                        map.get_mut(&**segment).unwrap()
                    } else {
                        map.insert(segment.to_string(), serde_json::Map::new().into());
                        map.get_mut(&**segment).unwrap()
                    };
                    let JsonValue::Object(inner_map) = obj else {
                        continue 'field;
                    };
                    map = inner_map;
                }
                map.entry(&**last_segment).or_insert_with(|| {
                    if value.len() == 1 {
                        tantivy_value_to_json(value.pop().unwrap())
                    } else {
                        JsonValue::Array(value.into_iter().map(tantivy_value_to_json).collect())
                    }
                });
            }
        }

        Ok(doc_json)
    }

    /// Returns the query.
    ///
    /// Considering schema evolution, splits within an index can have different schema
    /// over time. So `split_schema` is the schema of the split the query is targeting.
    pub fn query(
        &self,
        split_schema: Schema,
        query_ast: &QueryAst,
        with_validation: bool,
    ) -> Result<(Box<dyn Query>, WarmupInfo), QueryParserError> {
        build_query(
            query_ast,
            split_schema,
            self.tokenizer_manager(),
            &self.default_search_field_names[..],
            with_validation,
        )
    }

    /// Returns the list of search fields to search into, when no field is specified.
    /// (See `UserInputQuery`).
    pub fn default_search_fields(&self) -> &[String] {
        &self.default_search_field_names
    }

    /// Returns the schema.
    ///
    /// Considering schema evolution, splits within an index can have different schema
    /// over time. The schema returned here represents the most up-to-date schema of the index.
    pub fn schema(&self) -> Schema {
        self.schema.clone()
    }

    /// Returns the timestamp field name.
    pub fn timestamp_field_name(&self) -> Option<&str> {
        self.timestamp_field_name.as_deref()
    }

    /// Returns the tag `NameField`s on the current schema.
    /// Returns an error if a tag field is not found in this schema.
    pub fn tag_named_fields(&self) -> anyhow::Result<Vec<NamedField>> {
        let index_schema = self.schema();
        self.tag_field_names()
            .iter()
            .map(|field_name| {
                index_schema
                    .get_field(field_name)
                    .context(format!("field `{field_name}` must exist in the schema"))
                    .map(|field| NamedField {
                        name: field_name.clone(),
                        field,
                        field_type: index_schema.get_field_entry(field).field_type().clone(),
                    })
            })
            .collect::<Result<Vec<_>, _>>()
    }

    /// Returns the tag `NameField`s on the current schema.
    /// Returns an error if a tag field is not found in this schema.
    pub fn tag_field_names(&self) -> BTreeSet<String> {
        self.tag_field_names.clone()
    }

    /// Returns the maximum number of partitions.
    pub fn max_num_partitions(&self) -> NonZeroU32 {
        self.max_num_partitions
    }

    /// Returns the tokenizer manager.
    pub fn tokenizer_manager(&self) -> &TokenizerManager {
        &self.tokenizer_manager
    }
}

#[cfg(test)]
mod tests {
    use std::collections::{HashMap, HashSet};
    use std::iter::zip;

    use itertools::Itertools;
    use quickwit_common::PathHasher;
    use quickwit_query::query_ast::query_ast_from_user_text;
    use serde_json::{self, Value as JsonValue, json};
    use tantivy::schema::{
        FieldType, IndexRecordOption, OwnedValue as TantivyValue, OwnedValue, Type, Value,
    };

    use super::DocMapper;
    use crate::doc_mapper::field_mapping_entry::{DEFAULT_TOKENIZER_NAME, RAW_TOKENIZER_NAME};
    use crate::{
        DOCUMENT_SIZE_FIELD_NAME, DYNAMIC_FIELD_NAME, DocMapperBuilder, DocParsingError,
        FIELD_PRESENCE_FIELD_NAME, SOURCE_FIELD_NAME,
    };

    fn example_json_doc_value() -> JsonValue {
        serde_json::json!({
            "timestamp": 1586960586i64,
            "body": "20200415T072306-0700 INFO This is a great log",
            "response_date2": "2021-12-19T16:39:57+00:00",
            "response_date": "2021-12-19T16:39:57Z",
            "response_time": 2.3,
            "response_payload": "YWJj",
            "owner": "foo",
            "isImportant": false,
            "attributes": {
                "server": "ABC",
                "tags": [22, 23],
                "server.status": ["200", "201"],
                "server.payload": ["YQ==", "Yg=="]
            }
        })
    }

    const EXPECTED_JSON_PATHS_AND_VALUES: &str = r#"{
            "timestamp": ["2020-04-15T14:23:06Z"],
            "body": ["20200415T072306-0700 INFO This is a great log"],
            "response_date": ["2021-12-19T16:39:57Z"],
            "response_time": [2.3],
            "response_payload": ["YWJj"],
            "owner": ["foo"],
            "isImportant": [false],
            "body_other_tokenizer": ["20200415T072306-0700 INFO This is a great log"],
            "attributes.server": ["ABC"],
            "attributes.server\\.payload": ["YQ==", "Yg=="],
            "attributes.tags": [22, 23],
            "attributes.server\\.status": ["200", "201"]
        }"#;

    #[test]
    fn test_json_deserialize() -> anyhow::Result<()> {
        let config = crate::default_doc_mapper_for_test();
        assert!(config.source_field.is_some());
        let mut default_search_field_names: Vec<String> = config.default_search_field_names;
        default_search_field_names.sort();
        assert_eq!(
            default_search_field_names,
            ["attributes.server", r"attributes.server\.status", "body"]
        );
        assert_eq!(config.field_mappings.num_fields(), 10);
        Ok(())
    }

    #[test]
    fn test_parsing_document() {
        let json_doc = example_json_doc_value();
        let doc_mapper = crate::default_doc_mapper_for_test();
        let (_, document) = doc_mapper
            .doc_from_json_obj(json_doc.as_object().unwrap().clone(), 0)
            .unwrap();
        let schema = doc_mapper.schema();
        // 9 property entry + 1 field "_source" + 2 fields values for "tags" field
        // + 2 values inf "server.status" field + 2 values in "server.payload" field
        // + 7 values for field presence
        assert_eq!(document.len(), 23);
        let expected_json_paths_and_values: HashMap<String, JsonValue> =
            serde_json::from_str(EXPECTED_JSON_PATHS_AND_VALUES).unwrap();
        let mut field_presences: HashSet<u64> = HashSet::new();
        for (field, value) in document.field_values() {
            let owned_value: OwnedValue = value.into();
            let field_name = schema.get_field_name(field);
            if field_name == SOURCE_FIELD_NAME {
                // some part of aws-sdk enables `preserve_order` on serde_json.
                // to get "normal" equality, we are forced to recreate the json object
                // with sorted keys.
                let sorted_json_values = json_doc
                    .as_object()
                    .unwrap()
                    .clone()
                    .into_iter()
                    .sorted_by(|k1, k2| k1.0.cmp(&k2.0))
                    .collect::<serde_json::Map<_, _>>();
                assert_eq!(
                    tantivy::schema::OwnedValue::from(value.as_value()),
                    tantivy::schema::OwnedValue::from(sorted_json_values)
                );
            } else if field_name == DYNAMIC_FIELD_NAME {
                assert_eq!(
                    serde_json::to_string(&owned_value).unwrap(),
                    r#"{"response_date2":"2021-12-19T16:39:57Z"}"#
                );
            } else if field_name == FIELD_PRESENCE_FIELD_NAME {
                let field_presence_u64 = value.as_u64().unwrap();
                field_presences.insert(field_presence_u64);
            } else {
                let value = serde_json::to_string(&owned_value).unwrap();
                let is_value_in_expected_values = expected_json_paths_and_values
                    .get(field_name)
                    .unwrap()
                    .as_array()
                    .unwrap()
                    .iter()
                    .map(|expected_value| format!("{expected_value}"))
                    .any(|expected_value| expected_value == value);
                if !is_value_in_expected_values {
                    panic!("Could not find: {value:?} in {expected_json_paths_and_values:?}");
                }
            }
        }
        assert_eq!(field_presences.len(), 7);
        let timestamp_field = schema.get_field("timestamp").unwrap();
        let body_field = schema.get_field("body").unwrap();
        let attributes_field = schema.get_field("attributes.server").unwrap();
        assert!(!field_presences.contains(&PathHasher::hash_path(&[
            &timestamp_field.field_id().to_le_bytes()[..]
        ])));
        assert!(field_presences.contains(&PathHasher::hash_path(&[
            &body_field.field_id().to_le_bytes()[..]
        ])));
        assert!(field_presences.contains(&PathHasher::hash_path(&[
            &attributes_field.field_id().to_le_bytes()[..]
        ])));
    }

    #[test]
    fn test_accept_parsing_document_with_unknown_fields_and_missing_fields() {
        let doc_mapper = crate::default_doc_mapper_for_test();
        doc_mapper
            .doc_from_json_str(
                r#"{
                "timestamp": 1586960586000,
                "unknown_field": "20200415T072306-0700 INFO This is a great log",
                "response_date": "2021-12-19T16:39:57+00:00",
                "response_time": 12,
                "response_payload": "YWJj"
            }"#,
            )
            .unwrap();
    }

    #[test]
    fn test_fail_to_parse_document_with_wrong_cardinality() -> anyhow::Result<()> {
        let doc_mapper = crate::default_doc_mapper_for_test();
        let result = doc_mapper.doc_from_json_str(
            r#"{
                "timestamp": 1586960586000,
                "body": ["text 1", "text 2"]
            }"#,
        );
        assert!(result.is_err());
        let error = result.unwrap_err();
        assert_eq!(
            error,
            DocParsingError::MultiValuesNotSupported("body".to_owned())
        );
        Ok(())
    }

    #[test]
    fn test_fail_to_parse_document_with_wrong_value() -> anyhow::Result<()> {
        let doc_mapper = crate::default_doc_mapper_for_test();
        let result = doc_mapper.doc_from_json_str(
            r#"{
                "timestamp": 1586960586000,
                "body": 1
            }"#,
        );
        assert!(result.is_err());
        let error = result.unwrap_err();
        assert_eq!(
            error,
            DocParsingError::ValueError("body".to_owned(), "expected string, got `1`".to_owned())
        );
        Ok(())
    }

    #[test]
    fn test_timestamp_field_in_object_is_valid() {
        serde_json::from_str::<DocMapper>(
            r#"{
            "field_mappings": [
                {
                    "name": "some_obj",
                    "type": "object",
                    "field_mappings": [
                        {
                            "name": "timestamp",
                            "type": "datetime",
                            "fast": true
                        }
                    ]
                }
            ],
            "timestamp_field": "some_obj.timestamp"
        }"#,
        )
        .unwrap();

        serde_yaml::from_str::<DocMapper>(
            r#"
            field_mappings:
              - name: some_obj
                type: object
                field_mappings:
                  - name: timestamp
                    type: datetime
                    fast: true
            timestamp_field: some_obj.timestamp
        "#,
        )
        .unwrap();
    }

    #[test]
    fn test_timestamp_field_with_dots_in_its_name_is_valid() {
        serde_json::from_str::<DocMapper>(
            r#"{
            "field_mappings": [
                {
                    "name": "my.timestamp",
                    "type": "datetime",
                    "fast": true
                }
            ],
            "timestamp_field": "my\\.timestamp"
        }"#,
        )
        .unwrap();

        serde_yaml::from_str::<DocMapper>(
            r#"
            field_mappings:
              - name: my.timestamp
                type: datetime
                fast: true
            timestamp_field: "my\\.timestamp"
        "#,
        )
        .unwrap();
    }

    #[test]
    fn test_timestamp_field_that_start_with_dot_is_invalid() {
        assert_eq!(
            serde_json::from_str::<DocMapper>(
                r#"{
                "field_mappings": [
                    {
                        "name": "my.timestamp",
                        "type": "datetime",
                        "fast": true
                    }
                ],
                "timestamp_field": ".my.timestamp"
            }"#,
            )
            .unwrap_err()
            .to_string(),
            "timestamp field `.my.timestamp` should not start with a `.`",
        );

        assert_eq!(
            serde_json::from_str::<DocMapper>(
                r#"{
                "field_mappings": [
                    {
                        "name": "my.timestamp",
                        "type": "datetime",
                        "fast": true
                    }
                ],
                "timestamp_field": "\\.my\\.timestamp"
            }"#,
            )
            .unwrap_err()
            .to_string(),
            "timestamp field `\\.my\\.timestamp` should not start with a `.`",
        )
    }

    #[test]
    fn test_timestamp_field_that_ends_with_dot_is_invalid() {
        assert_eq!(
            serde_json::from_str::<DocMapper>(
                r#"{
                    "timestamp_field": "my.timestamp."
                }"#,
            )
            .unwrap_err()
            .to_string(),
            "timestamp field `my.timestamp.` should not end with a `.`",
        );

        assert_eq!(
            serde_json::from_str::<DocMapper>(
                r#"{
                    "timestamp_field": "my\\.timestamp\\."
                }"#,
            )
            .unwrap_err()
            .to_string(),
            "timestamp field `my\\.timestamp\\.` should not end with a `.`",
        )
    }

    #[test]
    fn test_tag_field_name_that_starts_with_dot_is_invalid() {
        assert_eq!(
            serde_json::from_str::<DocMapper>(
                r#"{
                    "tag_fields": [".my.tag"]
                }"#,
            )
            .unwrap_err()
            .to_string(),
            "tag field `.my.tag` should not start with a `.`",
        );

        assert_eq!(
            serde_json::from_str::<DocMapper>(
                r#"{
                    "tag_fields": ["\\.my\\.tag"]
                }"#,
            )
            .unwrap_err()
            .to_string(),
            "tag field `\\.my\\.tag` should not start with a `.`",
        )
    }

    #[test]
    fn test_tag_field_name_that_ends_with_dot_is_invalid() {
        assert_eq!(
            serde_json::from_str::<DocMapper>(
                r#"{
                    "tag_fields": ["my.tag."]
                }"#,
            )
            .unwrap_err()
            .to_string(),
            "tag field `my.tag.` should not end with a `.`",
        );

        assert_eq!(
            serde_json::from_str::<DocMapper>(
                r#"{
                    "tag_fields": ["my\\.tag\\."]
                }"#,
            )
            .unwrap_err()
            .to_string(),
            "tag field `my\\.tag\\.` should not end with a `.`",
        )
    }

    #[test]
    fn test_fail_to_build_doc_mapper_with_timestamp_field_with_multivalues_cardinality() {
        let doc_mapper = r#"{
            "timestamp_field": "timestamp",
            "tag_fields": [],
            "field_mappings": [
                {
                    "name": "timestamp",
                    "type": "array<i64>"
                }
            ]
        }"#;
        let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
        let expected_msg = "timestamp field `timestamp` should be a datetime field";
        assert_eq!(&builder.try_build().unwrap_err().to_string(), &expected_msg);
    }

    #[test]
    fn test_fail_to_build_doc_mapper_with_non_fast_timestamp_field() {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "timestamp_field": "timestamp",
            "tag_fields": [],
            "field_mappings": [
                {
                    "name": "timestamp",
                    "type": "datetime",
                    "fast": false
                }
            ]
        }"#;
        let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
        let expected_msg = "timestamp field `timestamp` should be a fast field";
        assert_eq!(&builder.try_build().unwrap_err().to_string(), &expected_msg);
    }

    #[test]
    fn test_fail_to_build_doc_mapper_with_duplicate_fields() {
        {
            let doc_mapper = r#"{
                "field_mappings": [
                    {"name": "body","type": "text"},
                    {"name": "body","type": "bytes"}
                ]
            }"#;
            let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
            let expected_msg = "duplicated field definition `body`";
            assert_eq!(&builder.try_build().unwrap_err().to_string(), expected_msg);
        }

        {
            let doc_mapper = r#"{
                "field_mappings": [
                    {
                        "name": "identity",
                        "type": "object",
                        "field_mappings": [
                            {"type": "text", "name": "username"},
                            {"type": "text", "name": "username"}
                        ]
                    },
                    {"type": "text", "name": "body"}
                ]
            }"#;
            let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
            let expected_msg = "duplicated field definition `username`";
            assert_eq!(&builder.try_build().unwrap_err().to_string(), expected_msg);
        }
    }

    #[test]
    fn test_should_build_doc_mapper_with_duplicate_fields_at_different_level() {
        let doc_mapper = r#"{
            "field_mappings": [
                {
                    "name": "identity",
                    "type": "object",
                    "field_mappings": [
                        {"type": "text", "name": "body"},
                        {"type": "text", "name": "username"}
                    ]
                },
                {"type": "text", "name": "body"}
            ]
        }"#;
        let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
        assert!(builder.try_build().is_ok());
    }

    #[test]
    fn test_fail_to_build_doc_mapper_with_multivalued_timestamp_field() {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "timestamp_field": "timestamp",
            "tag_fields": [],
            "field_mappings": [
                {
                    "name": "timestamp",
                    "type": "array<datetime>",
                    "fast": true
                }
            ]
        }"#;

        let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
        let expected_msg = "timestamp field `timestamp` should be single-valued";
        assert_eq!(&builder.try_build().unwrap_err().to_string(), expected_msg);
    }

    #[test]
    fn test_fail_with_field_name_equal_to_source() {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "tag_fields": [],
            "field_mappings": [
                {
                    "name": "_source",
                    "type": "i64"
                }
            ]
        }"#;
        let deser_err = serde_json::from_str::<DocMapperBuilder>(doc_mapper)
            .err()
            .unwrap();
        assert!(
            deser_err
                .to_string()
                .contains("the following fields are reserved for Quickwit internal usage")
        );
    }

    #[test]
    fn test_fail_to_parse_document_with_wrong_base64_value() -> anyhow::Result<()> {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "timestamp_field": null,
            "tag_fields": [],
            "field_mappings": [
                {
                    "name": "image",
                    "type": "bytes",
                    "stored": true
                }
            ]
        }"#;
        let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper)?;
        let doc_mapper = builder.try_build()?;
        let result = doc_mapper.doc_from_json_str(
            r#"{
            "image": "invalid base64 data"
        }"#,
        );
        let expected_msg = "the field `image` could not be parsed: expected base64 string, got \
                            `invalid base64 data`: Invalid symbol 32, offset 7.";
        assert_eq!(result.unwrap_err().to_string(), expected_msg);
        Ok(())
    }

    #[test]
    fn test_parse_document_with_tag_fields() {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "index_field_presence": true,
            "timestamp_field": null,
            "tag_fields": ["city"],
            "store_source": true,
            "field_mappings": [
                {
                    "name": "city",
                    "type": "text",
                    "stored": true,
                    "tokenizer": "raw"
                },
                {
                    "name": "image",
                    "type": "bytes",
                    "stored": true
                }
            ]
        }"#;

        let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
        let doc_mapper = builder.try_build().unwrap();
        let schema = doc_mapper.schema();
        let json_doc_value: JsonValue = serde_json::json!({
            "city": "tokio",
            "image": "YWJj"
        });
        let (_, document) = doc_mapper
            .doc_from_json_obj(json_doc_value.as_object().unwrap().clone(), 0)
            .unwrap();

        // 2 properties, + 1 value for "_source" + 2 for field presence.
        assert_eq!(document.len(), 5);
        let expected_json_paths_and_values: HashMap<String, JsonValue> = serde_json::from_str(
            r#"{
                "city": ["tokio"],
                "image": ["YWJj"]
            }"#,
        )
        .unwrap();
        let mut field_presences: HashSet<u64> = HashSet::default();
        document.field_values().for_each(|(field, value)| {
            let owned_value: OwnedValue = value.into();
            let field_name = schema.get_field_name(field);
            if field_name == SOURCE_FIELD_NAME {
                assert_eq!(
                    tantivy::schema::OwnedValue::from(value.as_value()),
                    tantivy::schema::OwnedValue::from(json_doc_value.as_object().unwrap().clone())
                );
            } else if field_name == FIELD_PRESENCE_FIELD_NAME {
                let field_value_hash = value.as_u64().unwrap();
                field_presences.insert(field_value_hash);
            } else {
                let value = serde_json::to_string(&owned_value).unwrap();
                let is_value_in_expected_values = expected_json_paths_and_values
                    .get(field_name)
                    .unwrap()
                    .as_array()
                    .unwrap()
                    .iter()
                    .map(|expected_value| format!("{expected_value}"))
                    .any(|expected_value| expected_value == value);
                assert!(is_value_in_expected_values);
            }
        });
        assert_eq!(field_presences.len(), 2);
        let city_field = schema.get_field("city").unwrap();
        let image_field = schema.get_field("image").unwrap();
        assert!(field_presences.contains(&PathHasher::hash_path(&[
            &city_field.field_id().to_le_bytes()
        ])));
        assert!(field_presences.contains(&PathHasher::hash_path(&[
            &image_field.field_id().to_le_bytes()
        ])));
    }

    #[test]
    fn test_partition_key_in_tags() {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "timestamp_field": null,
            "tag_fields": ["city"],
            "store_source": true,
            "partition_key": "hash_mod((service,division,city), 50)",
            "field_mappings": [
                {
                    "name": "city",
                    "type": "text",
                    "stored": true,
                    "tokenizer": "raw"
                },
                {
                    "name": "division",
                    "type": "text",
                    "stored": true,
                    "tokenizer": "raw"
                },
                {
                    "name": "service",
                    "type": "text",
                    "stored": true,
                    "tokenizer": "raw"
                }
            ]
        }"#;

        let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
        let doc_mapper = builder.try_build().unwrap();
        let tag_fields: Vec<_> = doc_mapper.tag_field_names.into_iter().collect();
        assert_eq!(tag_fields, vec!["city", "division", "service",]);
    }

    #[test]
    fn test_partition_key_in_tags_without_explicit_tags() {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "timestamp_field": null,
            "store_source": true,
            "partition_key": "service,hash_mod((division,city), 50)",
            "field_mappings": [
                {
                    "name": "city",
                    "type": "text",
                    "stored": true,
                    "tokenizer": "raw"
                },
                {
                    "name": "division",
                    "type": "text",
                    "stored": true,
                    "tokenizer": "raw"
                },
                {
                    "name": "service",
                    "type": "text",
                    "stored": true,
                    "tokenizer": "raw"
                }
            ]
        }"#;

        let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
        let doc_mapper = builder.try_build().unwrap();
        let tag_fields: Vec<_> = doc_mapper.tag_field_names.into_iter().collect();
        assert_eq!(tag_fields, vec!["city", "division", "service",]);
    }

    #[test]
    fn test_build_doc_mapper_with_tag_field_with_dots_in_its_name() {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "tag_fields": ["my\\.city\\.id"],
            "field_mappings": [
                {
                    "name": "my.city.id",
                    "type": "u64"
                }
            ]
        }"#;
        serde_json::from_str::<DocMapper>(doc_mapper).unwrap();
    }

    #[test]
    fn test_build_doc_mapper_with_tag_field_in_object() {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "tag_fields": ["location.city"],
            "field_mappings": [
                {
                    "name": "location",
                    "type": "object",
                    "field_mappings": [
                        {
                            "name": "city",
                            "type": "u64"
                        }
                    ]
                }
            ]
        }"#;
        serde_json::from_str::<DocMapper>(doc_mapper).unwrap();
    }

    #[test]
    fn test_fail_to_build_doc_mapper_with_wrong_tag_fields_types() -> anyhow::Result<()> {
        let doc_mapper_one = r#"{
            "default_search_fields": [],
            "tag_fields": ["city"],
            "field_mappings": [
                {
                    "name": "city",
                    "type": "text"
                }
            ]
        }"#;
        assert_eq!(
            serde_json::from_str::<DocMapperBuilder>(doc_mapper_one)?
                .try_build()
                .unwrap_err()
                .to_string(),
            "tags collection is only allowed on text fields with the `raw` tokenizer".to_string(),
        );

        let doc_mapper_two = r#"{
            "default_search_fields": [],
            "tag_fields": ["photo"],
            "field_mappings": [
                {
                    "name": "photo",
                    "type": "bytes"
                }
            ]
        }"#;
        assert_eq!(
            serde_json::from_str::<DocMapperBuilder>(doc_mapper_two)?
                .try_build()
                .unwrap_err()
                .to_string(),
            "tags collection is not allowed on `bytes` fields".to_string(),
        );
        Ok(())
    }

    // See #1132
    #[test]
    fn test_by_default_store_source_is_false_and_fields_are_stored_individually() {
        let doc_mapper = r#"{
            "default_search_fields": [],
            "field_mappings": [
                {
                    "name": "my-field",
                    "type": "u64",
                    "indexed": true
                }
            ]
        }"#;
        let builder = serde_json::from_str::<DocMapperBuilder>(doc_mapper).unwrap();
        let default_doc_mapper = builder.try_build().unwrap();
        assert!(default_doc_mapper.source_field.is_none());
        let schema = default_doc_mapper.schema();
        let field = schema.get_field("my-field").unwrap();
        let field_entry = schema.get_field_entry(field);
        assert!(field_entry.is_stored());
    }

    #[test]
    fn test_lenient_mode_schema() {
        let default_doc_mapper: DocMapper =
            serde_json::from_str(r#"{ "mode": "lenient" }"#).unwrap();
        let schema = default_doc_mapper.schema();
        assert_eq!(schema.num_fields(), 1);
        assert!(default_doc_mapper.default_search_field_names.is_empty());
    }

    #[test]
    fn test_dynamic_mode_schema() {
        let default_doc_mapper: DocMapper =
            serde_json::from_str(r#"{ "mode": "dynamic" }"#).unwrap();
        let schema = default_doc_mapper.schema();
        assert_eq!(schema.num_fields(), 2);
        let dynamic_field = schema.get_field(DYNAMIC_FIELD_NAME).unwrap();
        let dynamic_field_entry = schema.get_field_entry(dynamic_field);
        assert_eq!(dynamic_field_entry.field_type().value_type(), Type::Json);
        // the dynamic field will be added implicitly at search time.
        assert!(default_doc_mapper.default_search_field_names.is_empty());
    }

    #[test]
    fn test_dynamic_mode_schema_not_indexed() {
        let default_doc_mapper: DocMapper = serde_json::from_str(
            r#"{
            "mode": "dynamic",
            "dynamic_mapping": {
                "indexed": false,
                "stored": true
            }
        }"#,
        )
        .unwrap();
        let schema = default_doc_mapper.schema();
        assert_eq!(schema.num_fields(), 2);
        let dynamic_field = schema.get_field(DYNAMIC_FIELD_NAME).unwrap();
        let dynamic_field_entry = schema.get_field_entry(dynamic_field);
        let FieldType::JsonObject(json_opt) = dynamic_field_entry.field_type() else {
            panic!("Expected a json object");
        };
        assert_eq!(json_opt.is_indexed(), false);
        default_doc_mapper.default_search_field_names.is_empty();
    }

    #[test]
    fn test_strict_mode_simple() {
        let default_doc_mapper: DocMapper =
            serde_json::from_str(r#"{ "mode": "strict" }"#).unwrap();
        let parsing_err = default_doc_mapper
            .doc_from_json_str(r#"{ "a": { "b": 5, "c": 6 } }"#)
            .err()
            .unwrap();
        assert!(
            matches!(parsing_err, DocParsingError::NoSuchFieldInSchema(field_name) if field_name == "a")
        );
    }

    #[test]
    fn test_strict_mode_inner() {
        let default_doc_mapper: DocMapper = serde_json::from_str(
            r#"{
            "field_mappings": [
                {
                    "name": "some_obj",
                    "type": "object",
                    "field_mappings": [
                        {
                            "name": "child_a",
                            "type": "text"
                        }
                    ]
                }
            ],
            "mode": "strict"
        }"#,
        )
        .unwrap();
        assert!(
            default_doc_mapper
                .doc_from_json_str(r#"{ "some_obj": { "child_a": "hello" } }"#)
                .is_ok()
        );
        let parsing_err = default_doc_mapper
            .doc_from_json_str(r#"{ "some_obj": { "child_a": "hello", "child_b": 6 } }"#)
            .err()
            .unwrap();
        assert!(
            matches!(parsing_err, DocParsingError::NoSuchFieldInSchema(field_name) if field_name == "some_obj.child_b")
        );
    }

    #[test]
    fn test_lenient_mode_simple() {
        let default_doc_mapper: DocMapper =
            serde_json::from_str(r#"{ "mode": "lenient" }"#).unwrap();
        let (_, doc) = default_doc_mapper
            .doc_from_json_str(r#"{ "a": { "b": 5, "c": 6 } }"#)
            .unwrap();
        assert_eq!(doc.len(), 0);
    }

    #[track_caller]
    fn test_doc_from_json_test_aux(
        doc_mapper_json: &str,
        field: &str,
        document_json: &str,
        expected_values: Vec<TantivyValue>,
    ) {
        let default_doc_mapper: DocMapper = serde_json::from_str(doc_mapper_json).unwrap();
        let schema = default_doc_mapper.schema();
        let field = schema.get_field(field).unwrap();
        let (_, doc) = default_doc_mapper.doc_from_json_str(document_json).unwrap();

        let values: Vec<OwnedValue> = doc.get_all(field).map(|value| value.into()).collect();
        assert_eq!(values.len(), expected_values.len());

        for (value, expected_value) in zip(values, expected_values) {
            assert_eq!(value, expected_value);
        }
    }

    #[test]
    fn test_dymamic_mode_simple() {
        test_doc_from_json_test_aux(
            r#"{ "mode": "dynamic" }"#,
            DYNAMIC_FIELD_NAME,
            r#"{ "a": { "b": 5, "c": 6 } }"#,
            vec![
                json!({
                    "a": {
                        "b": 5,
                        "c": 6
                    }
                })
                .into(),
            ],
        );
    }

    #[test]
    fn test_dymamic_mode_inner() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_obj",
                        "type": "object",
                        "field_mappings": [
                            {
                                "name": "child_a",
                                "type": "text"
                            }
                        ]
                    }
                ],
                "mode": "dynamic"
            }"#,
            DYNAMIC_FIELD_NAME,
            r#"{ "some_obj": { "child_a": "", "child_b": {"c": 3} }, "some_obj2": 4 }"#,
            vec![
                json!({
                    "some_obj": {
                        "child_b": {
                            "c": 3
                        }
                    },
                    "some_obj2": 4
                })
                .into(),
            ],
        );
    }

    #[test]
    fn test_json_object_in_mapping() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_obj",
                        "type": "object",
                        "field_mappings": [
                            {
                                "name": "json_obj",
                                "type": "json"
                            }
                        ]
                    }
                ],
                "mode": "strict"
            }"#,
            "some_obj.json_obj",
            r#"{ "some_obj": { "json_obj": {"hello": 2} } }"#,
            vec![
                json!({
                    "hello": 2
                })
                .into(),
            ],
        );
    }

    #[test]
    fn test_reject_invalid_concatenate_field() {
        assert!(
            serde_json::from_str::<DocMapper>(
                r#"{
                "field_mappings": [
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "concatenate_fields": ["inexistent_field"]
                    }
                ]
            }"#
            )
            .unwrap_err()
            .to_string()
            .contains("uses an unknown field")
        );
        assert!(
            serde_json::from_str::<DocMapper>(
                r#"{
                "field_mappings": [
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "include_dynamic_fields": true
                    }
                ],
                "mode": "strict"
            }"#
            )
            .unwrap_err()
            .to_string()
            .contains(
                "concatenate field has `include_dynamic_fields` set, but index isn't dynamic"
            )
        );
        assert!(
            serde_json::from_str::<DocMapper>(
                r#"{
                "field_mappings": [
                    {
                        "name": "concat",
                        "type": "concatenate"
                    }
                ]
            }"#
            )
            .unwrap_err()
            .to_string()
            .contains("concatenate type must have at least one sub-field")
        );
    }

    #[test]
    fn test_concatenate_field_in_default_field() {
        serde_json::from_str::<DocMapper>(
            r#"{
                "default_search_fields": ["concat"],
                "field_mappings": [
                    {
                        "name": "some_text",
                        "type": "text"
                    },
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "concatenate_fields": ["some_text"]
                    }
                ],
                "mode": "strict"
            }"#,
        )
        .unwrap();
    }

    #[test]
    fn test_concatenate_field_in_mapping() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_text",
                        "type": "text"
                    },
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "concatenate_fields": ["some_text"]
                    }
                ],
                "mode": "strict"
            }"#,
            "concat",
            r#"{"some_text": "this is a text"}"#,
            vec!["this is a text".into()],
        );
    }

    #[test]
    fn test_concatenate_field_in_mapping_dynamic() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "include_dynamic_fields": true
                    }
                ],
                "mode": "dynamic"
            }"#,
            "concat",
            r#"{"other_field": "this is a text"}"#,
            vec!["this is a text".into()],
        );
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "include_dynamic_fields": true
                    }
                ],
                "mode": "dynamic"
            }"#,
            "concat",
            r#"{"first_field": "this is a text", "second_field": "this is a text field too"}"#,
            vec!["this is a text".into(), "this is a text field too".into()],
        );
    }

    #[test]
    fn test_concatenate_field_in_mapping_integer() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_int",
                        "type": "u64"
                    },
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "concatenate_fields": ["some_int"]
                    }
                ],
                "mode": "strict"
            }"#,
            "concat",
            r#"{"some_int": 25}"#,
            vec![25_u64.into()],
        );
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "include_dynamic_fields": true
                    }
                ],
                "mode": "dynamic"
            }"#,
            "concat",
            r#"{"some_int": 25}"#,
            vec![25_u64.into()],
        );
    }

    #[test]
    fn test_concatenate_field_in_mapping_boolean() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_bool",
                        "type": "bool"
                    },
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "concatenate_fields": ["some_bool"]
                    }
                ],
                "mode": "strict"
            }"#,
            "concat",
            r#"{"some_bool": false}"#,
            vec![false.into()],
        );
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "include_dynamic_fields": true
                    }
                ],
                "mode": "dynamic"
            }"#,
            "concat",
            r#"{"some_bool": true}"#,
            vec![true.into()],
        );
    }

    #[test]
    fn test_concatenate_field_array() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_text",
                        "type": "array<text>"
                    },
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "concatenate_fields": ["some_text"]
                    }
                ],
                "mode": "strict"
            }"#,
            "concat",
            r#"{"some_text": ["this is a text", "this is a text too"]}"#,
            vec!["this is a text".into(), "this is a text too".into()],
        );
    }

    #[test]
    fn test_concatenate_multiple_field() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_text",
                        "type": "text"
                    },
                    {
                        "name": "other_text",
                        "type": "text"
                    },
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "concatenate_fields": ["some_text", "other_text"]
                    }
                ],
                "mode": "strict"
            }"#,
            "concat",
            r#"{"some_text": "this is a text", "other_text": "this is a text too"}"#,
            vec!["this is a text".into(), "this is a text too".into()],
        );
    }

    #[test]
    fn test_concatenate_field_object() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_obj",
                        "type": "object",
                        "field_mappings": [
                            {
                                "name": "json_obj",
                                "type": "json"
                            }
                        ]
                    },
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "concatenate_fields": ["some_obj.json_obj"]
                    }
                ],
                "mode": "strict"
            }"#,
            "concat",
            r#"{ "some_obj": { "json_obj": {"hello": "world"} } }"#,
            vec!["world".into()],
        );
    }

    /*
     * in the future we may want to make this works. Currently it isn't supported and fail at index
     * creation
    #[test]
    fn test_concatenate_field_json_subpath() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "json_obj",
                        "type": "json"
                    },
                    {
                        "name": "concat",
                        "type": "concatenate",
                        "concatenate_fields": ["json_obj.hello"]
                    }
                ],
                "mode": "strict"
            }"#,
            "concat",
            r#"{ "json_obj": { "hello": "1", "world": "2"} }"#,
            vec!["1".into()],
        );
    }
    */

    #[test]
    fn test_concatenate_field_text() {
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_text",
                        "type": "text"
                    },
                    {
                        "name": "concat1",
                        "type": "concatenate",
                        "concatenate_fields": ["some_text"]
                    },
                    {
                        "name": "concat2",
                        "type": "concatenate",
                        "concatenate_fields": ["some_text"]
                    }
                ],
                "mode": "strict"
            }"#,
            "concat1",
            r#"{"some_text": "this is a text"}"#,
            vec!["this is a text".into()],
        );
        test_doc_from_json_test_aux(
            r#"{
                "field_mappings": [
                    {
                        "name": "some_text",
                        "type": "text"
                    },
                    {
                        "name": "concat1",
                        "type": "concatenate",
                        "concatenate_fields": ["some_text"]
                    },
                    {
                        "name": "concat2",
                        "type": "concatenate",
                        "concatenate_fields": ["some_text"]
                    }
                ],
                "mode": "strict"
            }"#,
            "concat2",
            r#"{"some_text": "this is a text"}"#,
            vec!["this is a text".into()],
        );
    }

    #[test]
    fn test_length_field() {
        let raw_doc = r#"{ "some_obj": { "json_obj": {"hello": 2} } }"#;
        test_doc_from_json_test_aux(
            r#"{
                "document_length": true,
                "mode": "dynamic"
            }"#,
            DOCUMENT_SIZE_FIELD_NAME,
            raw_doc,
            vec![(raw_doc.len() as u64).into()],
        );
    }

    fn default_doc_mapper_query_aux(doc_mapper: &DocMapper, query: &str) -> Result<String, String> {
        let query_ast = query_ast_from_user_text(query, None)
            .parse_user_query(doc_mapper.default_search_fields())
            .map_err(|err| err.to_string())?;
        let (query, _) = doc_mapper
            .query(doc_mapper.schema(), &query_ast, true)
            .map_err(|err| err.to_string())?;
        Ok(format!("{query:?}"))
    }

    #[test]
    fn test_doc_mapper_sub_field_query_on_non_json_field_should_error() {
        let doc_mapper: DocMapper = serde_json::from_str(
            r#"{
            "field_mappings": [{"name": "body", "type": "text"}],
            "mode": "dynamic"
        }"#,
        )
        .unwrap();
        assert_eq!(
            default_doc_mapper_query_aux(&doc_mapper, "body.wrong_field:hello").unwrap_err(),
            "invalid query: field does not exist: `body.wrong_field`"
        );
    }

    #[test]
    fn test_doc_mapper_accept_sub_field_query_on_json_field() {
        let doc_mapper: DocMapper = serde_json::from_str(
            r#"{
            "field_mappings": [{"name": "body", "type": "json"}],
            "mode": "dynamic"
        }"#,
        )
        .unwrap();
        assert_eq!(
            default_doc_mapper_query_aux(&doc_mapper, "body.dynamic_field:hello"),
            Ok(
                r#"TermQuery(Term(field=2, type=Json, path=dynamic_field, type=Str, "hello"))"#
                    .to_string()
            )
        );
    }

    #[test]
    fn test_doc_mapper_object_dot_collision_with_object_field() {
        let doc_mapper: DocMapper = serde_json::from_str(
            r#"{
            "field_mappings": [
                {
                    "name": "identity",
                    "type": "object",
                    "field_mappings": [{"type": "text", "name": "username"}]
                },
                {"type": "text", "name": "identity.username"}
            ]
        }"#,
        )
        .unwrap();
        assert_eq!(
            default_doc_mapper_query_aux(&doc_mapper, "identity.username:toto").unwrap(),
            r#"TermQuery(Term(field=2, type=Str, "toto"))"#
        );
        assert_eq!(
            default_doc_mapper_query_aux(&doc_mapper, r"identity\.username:toto").unwrap(),
            r#"TermQuery(Term(field=3, type=Str, "toto"))"#
        );
    }

    #[test]
    fn test_doc_mapper_object_dot_collision_with_json_field() {
        let doc_mapper: DocMapper = serde_json::from_str(
            r#"{
            "field_mappings": [
                {"name": "identity", "type": "json"},
                {"type": "text", "name": "identity.username"}
            ]
        }"#,
        )
        .unwrap();
        assert_eq!(
            default_doc_mapper_query_aux(&doc_mapper, "identity.username:toto").unwrap(),
            r#"TermQuery(Term(field=2, type=Json, path=username, type=Str, "toto"))"#
        );
        assert_eq!(
            default_doc_mapper_query_aux(&doc_mapper, r"identity\.username:toto").unwrap(),
            r#"TermQuery(Term(field=3, type=Str, "toto"))"#
        );
    }

    #[test]
    fn test_doc_mapper_default_tokenizers() {
        let doc_mapper: DocMapper = serde_json::from_str(
            r#"{
            "field_mappings": [
                {"name": "json_field", "type": "json"},
                {"name": "text_field", "type": "text"}
            ]
        }"#,
        )
        .unwrap();
        let schema = doc_mapper.schema();

        {
            let json_field = schema.get_field("json_field").unwrap();
            let FieldType::JsonObject(json_options) =
                schema.get_field_entry(json_field).field_type()
            else {
                panic!()
            };
            let text_indexing_options = json_options.get_text_indexing_options().unwrap();
            assert_eq!(text_indexing_options.tokenizer(), RAW_TOKENIZER_NAME);
            assert_eq!(
                text_indexing_options.index_option(),
                IndexRecordOption::Basic
            );
        }

        {
            let text_field = schema.get_field("text_field").unwrap();
            let FieldType::Str(text_options) = schema.get_field_entry(text_field).field_type()
            else {
                panic!()
            };
            assert_eq!(
                text_options.get_indexing_options().unwrap().tokenizer(),
                DEFAULT_TOKENIZER_NAME
            );
        }
    }

    #[test]
    fn test_find_field_mapping_type() {
        let mapper = serde_json::from_str::<DocMapper>(
            r#"{
            "field_mappings": [
                {
                    "name": "some_obj",
                    "type": "object",
                    "field_mappings": [
                        {
                            "name": "timestamp",
                            "type": "datetime",
                            "fast": true
                        },
                        {
                            "name": "object2",
                            "type": "object",
                            "field_mappings": [
                                {
                                    "name": "id",
                                    "type": "u64"
                                },
                                {
                                    "name": "my.id",
                                    "type": "u64"
                                }
                            ]
                        }
                    ]
                },
                {
                    "name": "my.timestamp",
                    "type": "datetime",
                    "fast": true
                }
            ]
        }"#,
        )
        .unwrap();
        mapper
            .field_mappings
            .find_field_mapping_type("some_obj.timestamp")
            .unwrap();
        mapper
            .field_mappings
            .find_field_mapping_type("some_obj.object2.id")
            .unwrap();
        mapper
            .field_mappings
            .find_field_mapping_type("some_obj.object2")
            .unwrap();
        mapper
            .field_mappings
            .find_field_mapping_type("some_obj.object2.my\\.id")
            .unwrap();
        mapper
            .field_mappings
            .find_field_mapping_type("my\\.timestamp")
            .unwrap();
    }

    #[test]
    fn test_build_doc_mapper_with_custom_ngram_tokenizer() {
        let mapper = serde_json::from_str::<DocMapper>(
            r#"{
            "tokenizers": [
                {
                    "name": "my_tokenizer",
                    "filters": ["lower_caser", "ascii_folding", "remove_long"],
                    "type": "ngram",
                    "min_gram": 3,
                    "max_gram": 5
                }
            ],
            "field_mappings": [
                {
                    "name": "my_text",
                    "type": "text",
                    "tokenizer": "my_tokenizer"
                }
            ]
        }"#,
        )
        .unwrap();
        let field_mapping_type = mapper
            .field_mappings
            .find_field_mapping_type("my_text")
            .unwrap();
        match &field_mapping_type {
            super::FieldMappingType::Text(options, _) => {
                assert!(options.indexing_options.is_some());
                let tokenizer = &options.indexing_options.as_ref().unwrap().tokenizer;
                assert_eq!(tokenizer.name(), "my_tokenizer");
            }
            _ => panic!("Expected a text field"),
        }
        assert!(
            mapper
                .tokenizer_manager()
                .get_tokenizer("my_tokenizer")
                .is_some()
        );
    }

    #[test]
    fn test_build_doc_mapper_should_fail_with_unknown_tokenizer() {
        let mapper_builder = serde_json::from_str::<DocMapperBuilder>(
            r#"{
            "field_mappings": [
                {
                    "name": "my_text",
                    "type": "text",
                    "tokenizer": "my_tokenizer"
                }
            ]
        }"#,
        )
        .unwrap();
        let mapper = mapper_builder.try_build();
        let error_msg = mapper.unwrap_err().to_string();
        assert!(error_msg.contains("unknown tokenizer"));
    }

    #[test]
    fn test_build_doc_mapper_tokenizer_manager_with_custom_tokenizer() {
        let mapper = serde_json::from_str::<DocMapper>(
            r#"{
            "tokenizers": [
                {
                    "name": "my_tokenizer",
                    "filters": ["lower_caser"],
                    "type": "ngram",
                    "min_gram": 3,
                    "max_gram": 5
                }
            ],
            "field_mappings": [
                {
                    "name": "my_text",
                    "type": "text",
                    "tokenizer": "my_tokenizer"
                }
            ]
        }"#,
        )
        .unwrap();
        let mut tokenizer = mapper
            .tokenizer_manager()
            .get_tokenizer("my_tokenizer")
            .unwrap();
        let mut token_stream = tokenizer.token_stream("HELLO WORLD");
        assert_eq!(token_stream.next().unwrap().text, "hel");
        assert_eq!(token_stream.next().unwrap().text, "hell");
        assert_eq!(token_stream.next().unwrap().text, "hello");
    }

    #[test]
    fn test_build_doc_mapper_with_custom_invalid_regex_tokenizer() {
        let mapper_builder = serde_json::from_str::<DocMapperBuilder>(
            r#"{
            "tokenizers": [
                {
                    "name": "my_tokenizer",
                    "type": "regex",
                    "pattern": "(my_pattern"
                }
            ],
            "field_mappings": [
                {
                    "name": "my_text",
                    "type": "text",
                    "tokenizer": "my_tokenizer"
                }
            ]
        }"#,
        )
        .unwrap();
        let mapper = mapper_builder.try_build();
        assert!(mapper.is_err());
        let error_mesg = mapper.unwrap_err().to_string();
        assert!(error_mesg.contains("invalid regex tokenizer"));
    }

    #[test]
    fn test_doc_mapper_with_custom_tokenizer_equivalent_to_default() {
        let mapper = serde_json::from_str::<DocMapper>(
            r#"{
            "tokenizers": [
                {
                    "name": "my_tokenizer",
                    "filters": ["remove_long", "lower_caser"],
                    "type": "simple",
                    "min_gram": 3,
                    "max_gram": 5
                }
            ],
            "field_mappings": [
                {
                    "name": "my_text",
                    "type": "text",
                    "tokenizer": "my_tokenizer"
                }
            ]
        }"#,
        )
        .unwrap();
        let mut default_tokenizer = mapper.tokenizer_manager().get_tokenizer("default").unwrap();
        let mut tokenizer = mapper
            .tokenizer_manager()
            .get_tokenizer("my_tokenizer")
            .unwrap();
        let text = "I've seen things... seen things you little people wouldn't believe.";
        let mut default_token_stream = default_tokenizer.token_stream(text);
        let mut token_stream = tokenizer.token_stream(text);
        for _ in 0..10 {
            assert_eq!(
                default_token_stream.next().unwrap().text,
                token_stream.next().unwrap().text
            );
        }
    }

    #[test]
    fn test_deserialize_doc_after_mapping_change_json_to_obj() {
        use serde::Deserialize;
        use tantivy::Document;

        let old_mapper = json!({
            "field_mappings": [
                {"name": "body", "type": "json"}
            ]
        });

        let builder = DocMapperBuilder::deserialize(old_mapper.clone()).unwrap();
        let old_mapper = builder.try_build().unwrap();

        let JsonValue::Object(doc) = json!({
            "body": {
                "field.1": "hola",
                "field2": {
                    "key": "val",
                    "arr": [1,"abc", {"k": "v"}],
                },
                "field3": ["a", "b"]
            }
        }) else {
            panic!();
        };
        let tantivy_doc = old_mapper.doc_from_json_obj(doc.clone(), 0).unwrap().1;
        let named_doc = tantivy_doc.to_named_doc(&old_mapper.schema());

        let new_mapper = json!({
            "field_mappings": [
                {
                    "name": "body",
                    "type": "object",
                    "field_mappings": [
                        {"name": "field.1", "type": "text"},
                        {"name": "field2", "type": "json"},
                        {"name": "field3", "type": "array<text>"},
                    ]
                }
            ]
        });
        let builder = DocMapperBuilder::deserialize(new_mapper).unwrap();
        let new_mapper = builder.try_build().unwrap();

        assert_eq!(new_mapper.doc_to_json(named_doc.0).unwrap(), doc);
    }

    #[test]
    fn test_deserialize_doc_after_mapping_change_obj_to_json() {
        use serde::Deserialize;
        use tantivy::Document;

        let old_mapper = json!({
            "field_mappings": [
                {
                    "name": "body",
                    "type": "object",
                    "field_mappings": [
                        {"name": "field.1", "type": "text"},
                        {"name": "field2", "type": "json"},
                        {"name": "field3", "type": "array<text>"},
                    ]
                }
            ]
        });

        let builder = DocMapperBuilder::deserialize(old_mapper.clone()).unwrap();
        let old_mapper = builder.try_build().unwrap();

        let JsonValue::Object(doc) = json!({
            "body": {
                "field.1": "hola",
                "field2": {
                    "key": "val",
                    "arr": [1,"abc", {"k": "v"}],
                },
                "field3": ["a", "b"]
            }
        }) else {
            panic!();
        };
        let tantivy_doc = old_mapper.doc_from_json_obj(doc.clone(), 0).unwrap().1;
        let named_doc = tantivy_doc.to_named_doc(&old_mapper.schema());

        let new_mapper = json!({
            "field_mappings": [
                {"name": "body", "type": "json"}
            ]
        });
        let builder = DocMapperBuilder::deserialize(new_mapper).unwrap();
        let new_mapper = builder.try_build().unwrap();

        assert_eq!(new_mapper.doc_to_json(named_doc.0).unwrap(), doc);
    }
}
